package net.bwie.dwd;


import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;


public class DwdCarsLog {
    public static void main(String[] args) throws Exception {
        // 1.表执行环境
        TableEnvironment tableEnv = getTableEnv();

        // 2.输入表 读取Kafka数据
        createInputTable(tableEnv);

        // 3.数据处理
        Table resultTable = handle(tableEnv);

        // 4.映射表 写入Kafka表中
        createOutputTable(tableEnv);

        // 5.保存数据
        saveToSink(tableEnv, resultTable);

    }

    private static void saveToSink(TableEnvironment tableEnv, Table resultTable) {
        // todo 创建临时视图 然后写入Kafka中
        tableEnv.createTemporaryView("carsLog", resultTable);
        tableEnv.executeSql(
                "insert into dwd_summary_cars_log " +
                        "select id,owerId,opTime,cId,carCode,ctype from carsLog"
        );
    }

    private static void createOutputTable(TableEnvironment stream) {
        // todo 创建Kafka表
        stream.executeSql(
                "create table dwd_summary_cars_log(\n" +
                        "    id STRING,\n" +
                        "    owerId BIGINT,\n" +
                        "    opTime TIMESTAMP(3),\n" +
                        "    cId BIGINT,\n" +
                        "    carCode STRING,\n" +
                        "    ctype STRING,\n" +
                        "    PRIMARY KEY (id) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "    'connector' = 'upsert-kafka',\n" +
                        "    'topic' = 'dwd_summary_cars_log',\n" +
                        "    'properties.bootstrap.servers' = 'node101:9092',\n" +
                        "    'key.format' = 'json',\n" +
                        "    'value.format' = 'json'\n" +
                        ")"
        );
    }

    private static Table handle(TableEnvironment stream) {
        // todo 查询
        Table searchLogTable = stream.sqlQuery(
                "SELECT\n" +
                        "  w.id,\n" +
                        "  COALESCE(d.owerId, 1) AS owerId,\n" +
                        "  w.opTime,\n" +
                        "  w.cId,\n" +
                        "  w.carCode,\n" +
                        "  w.ctype\n" +
                        "FROM\n" +
                        "  ods_cars_log_clean AS w\n" +
                        "LEFT JOIN ods_cars_clean as d\n" +
                        "ON w.carCode = d.carCode"
        );

        return searchLogTable;
    }


    private static void createInputTable(TableEnvironment stream) {
        // todo 创建输入表 车辆进出事实表 ods_cars_log_clean
        stream.executeSql(
                "CREATE TABLE ods_cars_log_clean (\n" +
                        " id STRING,\n" +
                        " opTime TIMESTAMP(3),\n" +
                        " ctype STRING,\n" +
                        " carCode STRING,\n" +
                        " cId BIGINT,\n" +
                        " proc_time AS PROCTIME(),\n" +
                        " WATERMARK FOR opTime AS opTime - INTERVAL '5' MINUTE\n" +
                        ") WITH (\n" +
                        " 'connector' = 'kafka',\n" +
                        " 'topic' = 'ods-cars-Log',\n" +
                        " 'properties.bootstrap.servers' = 'node101:9092',\n" +
                        " 'properties.group.id' = 'ods_cars_log_clean',\n" +
                        " 'scan.startup.mode' = 'earliest-offset',\n" +
                        " 'format' = 'json'\n" +
                        ")"
        );
//        stream.executeSql("SELECT * FROM ods_cars_log_clean").print();
        // todo 创建输入表 车辆维度表 ods_cars_clean
        stream.executeSql(
                "CREATE TABLE ods_cars_clean(\n" +
                        "    id INT,\n" +
                        "    owerId INT,\n" +
                        "    carCode STRING,\n" +
                        "    carColor STRING,\n" +
                        "    type STRING,\n" +
                        "    remark STRING\n" +
                        ") WITH ( \n" +
                        "    'connector' = 'doris',\n" +
                        "    'fenodes' = 'node102:8030',\n" +
                        "    'table.identifier' = 'sca_doris.cars',\n" +
                        "    'username' = 'root',\n" +
                        "    'password' = '123456'\n" +
                        ")"
        );
//        stream.executeSql("SELECT * FROM ods_cars_clean").print();
    }

    public static TableEnvironment getTableEnv() {
        // 1.环境属性设置
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);
        // 2.配置属性设置
        Configuration configuration = tabEnv.getConfig().getConfiguration();
        configuration.setString("table.local-time-zone", "Asia/Shanghai");
        configuration.setString("table.exec.resource.default-parallelism", "1");
        configuration.setString("table.exec.state.ttl", "5 s");
        configuration.setString("execution.checkpointing.interval", "5 s");
        // 3.返回对象
        return tabEnv;
    }

}
